(到了29篇,突然不知道要寫什麼了...想到我自己之前對 batch process 的筆記也可以來改寫成一篇哈...)
透過前面一系列的 demo,我們知道Opentelemetry 這個開源標準主要是採集 Trace、Metric、Log 三種類型的遙測資料,並且大致了解它們的採集邏輯、並手寫實現了相應的 mini-sdk。此外,我們同時也 demo 了有關 Sentry 在錯誤捕捉、前端效能追蹤、全鏈路請求監控等,以及也手寫 demo 了相關邏輯。
然而在上述的情境中,其實很容易在很短的時間內發起許多請求、把相關數據傳送到指定服務。這麼一來,無論是在前端還是後端,在處理自身業務邏輯之外,還需要負擔觀測數據的請求傳送,可能會造成網路延遲、或資源被佔用。
這時,我們可以看到在 SDK 中,有 BatchProcessor這個處理方式,透過批量處理請求來優化這個問題。
批量處理請求,代表一次處理一連串的請求。首先我們會將請求放在一個 Buffer 中(通常使用 queue ),等到某些條件達成的時候再一次處理。
而這個任務積壓也不能太多,不然會讓程式一瞬間要處理太多任務。通常會設一個 queue 最大值,如果積壓任務的數量大過最大值時,就要立即處理、或者丟棄新任務。
另外,如果請求一直沒有辦法達到最大值的話,是不是就沒辦法觸發處理請求?這樣的結果也是沒辦法接受的,所以往往還需要另外設定一個定時器,來定時處理在 queue 中積壓的請求。
因此,通常主要有兩個觸發批量處理的時刻:
而在 Opentelemetry-js 中的 BatchSpanProcessorBase
和BatchLogRecordProcessorBase
,有定義自己的觸發邏輯:當有 task (span / log)進入 buffer queue 的時候,就會考慮要不要 export:
setTimeout
來異步 flush。
maxExportBatchSize
maxQueueSize
scheduledDelayMillis
exportTimeoutMillis
然後加上 Buffer 以及一些 flag, 該 BatchProcessor 構造器為
class BatchProcessor {
constructor(exporter) {
this._exporter = exporter;
this._maxExportBatchSize = 5;
this._maxQueueSize = 10;
this._scheduledDelayMillis = 1000; //ms
this._exportTimeoutMillis = 5000; //ms
this._buffer = [];
this._timer = undefined;
this._isExporting = false; //避免重複導出
}
}
const timer = setTimeout(() => {
reject(new Error('Timeout'));
}, this._exportTimeoutMillis);
maxExportBatchSize
, 然後給 exporter 處理let taskItems;
if (this._buffer.length <= this._maxExportBatchSize) {
taskItems = this._buffer;
this._buffer = [];
} else {
taskItems = this._buffer.splice(0, this._maxExportBatchSize);
}
this._exporter(taskItems, () => {
clearTimeout(timer);
resolve();
});
flush
函數,如果執行完 batch 後,Buffer裡面還有任務,那麼就再執行一次:const flush = () => {
...
this._flushOneBatch().finally(() => {
...
if (this._buffer.length > 0) {
...
this._maybeStartTimer();
}
});
};
...
// 如果 buffer 內大於單次批次大小,立即flush---加速
if (this._buffer.length >= this._maxExportBatchSize) {
return flush();
}
if (this._timer !== undefined) return;
// 允許延遲,讓更多任務進入 buffer;
this._timer = setTimeout(() => flush(), this._scheduledDelayMillis);
我們先 mock 一個 exporter,它的作用就是打印所有的 task 名稱:
const mockExporter = (tasks, callback) => {
Promise.all(
tasks.map((task) => {
return new Promise((resolve) => {
console.log(task);
resolve();
});
}),
).finally(() => {
console.log('---finish export!!');
callback();
});
};
然後生成 30 個任務,為每個任務生成一個隨機的延遲時間,以模擬不同時間點的任務進入 BatchProcessor。
mockTasks.forEach((task, index) => {
setTimeout(() => {
console.log(`Pushing ${task} into the batch processor`);
batchProcessor.addToBuffer(task);
}, randomDelay() * index); // 隨機延遲推送任務到 processor
});
結果如下:
我們可以看到,輸出結果跟我們設定的一樣,任務為5個一個數量單位輸出,實現了簡易的批量處理!
在本文中,我們依照 opentelemetry-js 中的 batch process 邏輯,實現了邏輯相近的批量處理。不過這個處理方式是在 SDK 中減少請求的次數,進而減緩 collector、處理遙測數據 server 的負擔。
但實際上,如果數據量上來,還是需要其他的優化手段如:
所以除了在 SDK 中使用批量處理,還需要視實際業務情況來增加額外的優化手段。
本文程式碼可以在此 Github repository 中查看。